<?php


namespace App\Common;


use EasySwoole\Component\Singleton;
use EasySwoole\EasySwoole\Config;
use EasySwoole\Pool\Manager;
use Exception;
use PhpAmqpLib\Message\AMQPMessage;

class Rabbitmq
{
    use Singleton;

    /**
     * @var \PhpAmqpLib\Channel\AMQPChannel
     */
    protected $rabbConn;

    public function __construct()
    {
        if (!$this->singleton()) {
            $rabbConf = Config::getInstance()->getConf('RABBITMQ');

            $rabb = Manager::getInstance()->get($rabbConf['pool_name']);

            $this->rabbConn = $rabb->getObj();
//            $redis->defer();
        }
    }

    private function singleton()
    {
        if (!$this->rabbConn) {
            return false;
        }
        return true;
    }


    /**
     * @param $exchangeName string 交换机名称
     * @param $type string 路由类型   direct精准推送; fanout 广播。推送到绑定到此交换机下的所有队列;  topic组播。比如上面我绑定的关键字是sms_send，那么他可以推送到*.sms_send的所有队列 headers这个目前不知道是如何推送的
     * @param $passive bool 默认false
     * @param $durable bool 交换机是否开启持久化
     * @param $auto_delete bool 通道关闭后是否删除交换机
     * 创建交换机 初始化交换机
     */
    public function exchangeDeclare($exchangeName, $type = 'direct', $passive = false, $durable = true, $auto_delete = false)
    {
        //声明初始化交换机 创建交换机
        $this->rabbConn->exchange_declare($exchangeName, $type, $passive, $durable, $auto_delete);

    }


    /**
     * @param string $queueName 队列机名称
     * @param bool $passive
     * @param bool $durable 队列是否开启持久化
     * @param bool $exclusive
     * @param bool $auto_delete 通道关闭后是否删除队列
     * 创建队列
     */
    public function queueDeclare($queueName, $passive = false, $durable = true, $exclusive = false, $auto_delete = false)
    {
        $this->rabbConn->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
    }

    /**
     * @param string $queueName 队列名称
     * @param string $exchangeName 交换机名称
     * @param string $routingKey 路由关键字
     * 队列绑定到交换机上
     */
    public function queueBind($queueName, $exchangeName, $routingKey)
    {
        //将队列与某个交换机进行绑定，并使用路由关键字
        $this->rabbConn->queue_bind($queueName, $exchangeName, $routingKey);
    }

    /**
     * @param $queueName string 交换机名
     * @param string $exchangeName //队列名称
     * @param string $routingKey 路由关键字
     * @param $callback \stdClass 消息回调处理类
     * @throws Exception
     * 消费
     */
    public function pop($queueName, $exchangeName = '', $routingKey = '', $callback)
    {

        //定义消息回调回调函数
        $callback = function ($msg) use ($callback) {
            new $callback($msg);
        };
        //告诉rabbitmq 一次不要发送过多消息 处理完了在发送
        $this->rabbConn->basic_qos(null, 1, null);
        //获取消费消息
        $this->rabbConn->basic_consume($queueName, '', false, true, false, false, $callback);
        //循环消费
        while (count($this->rabbConn->callbacks)) {
            $this->rabbConn->wait();
        }
        $this->close();//关闭链接
    }

    /**
     * @param array $msgBody 投递的消息
     * @param $queueName string 队列消息
     * @param $exchangeName string 交换机名
     * @param $routingKey string 路由关键字(也可以省略)
     * @throws Exception
     * 生产
     */
    public function push(array $msgBody,$queueName,$exchangeName,$routingKey)
    {
        //创建交换机   type类型 direct 可以不创建交换机使用默认也可以
        $this->rabbConn->exchangeDeclare($exchangeName, 'direct');
        // 创建队列
        $this->queueDeclare($queueName);
        //绑定队列在交换机上 必须绑定
        $this->queueBind($queueName, $exchangeName, $routingKey);
        $msg = new AMQPMessage(json_encode($msgBody), ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); //生成消息
        $this->rabbConn->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某个交换机
        $this->close();//关闭链接
    }

    /**
     * @throws Exception
     * 关闭链接
     */
    public function close()
    {
        $this->rabbConn->close();
    }

    public function conn()
    {
        return $this->rabbConn;
    }
}
